<!DOCTYPE html>
<html lang="en">

<head>
	<meta charset="UTF-8">
	<meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1.0, user-scalable=no">
	<meta name="keywords" content="Seata、分布式事务、AT模式" />
	<meta name="description" content="Seata 源码分析系列" />
	<!-- 网页标签标题 -->
	<title>Seata AT 模式启动源码分析</title>
  <link rel="shortcut icon" href="/img/seata_logo_small.jpeg"/>
	<link rel="stylesheet" href="/build/blogDetail.css" />
</head>
<body>
	<div id="root"><div class="blog-detail-page" data-reactroot=""><header class="header-container header-container-normal"><div class="header-body"><a href="/zh-cn/index.html"><img class="logo" src="/img/seata_logo.png"/></a><div class="search search-normal"><span class="icon-search"></span></div><span class="language-switch language-switch-normal">En</span><div class="header-menu"><img class="header-menu-toggle" src="/img/system/menu_gray.png"/><ul><li class="menu-item menu-item-normal"><a href="/zh-cn/index.html" target="_self">首页</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/docs/overview/what-is-seata.html" target="_self">文档</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/docs/developers/developers_dev.html" target="_self">开发者</a></li><li class="menu-item menu-item-normal menu-item-normal-active"><a href="/zh-cn/blog/index.html" target="_self">博客</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/community/index.html" target="_self">社区</a></li><li class="menu-item menu-item-normal"><a href="/zh-cn/blog/download.html" target="_self">下载</a></li></ul></div></div></header><section class="blog-content markdown-body"><h1>前言</h1>
<p>从上一篇文章「<a href="https://mp.weixin.qq.com/s/Pypkm5C9aLPJHYwcM6tAtA">分布式事务中间件Seata的设计原理</a>」讲了下 Seata AT 模式的一些设计原理，从中也知道了 AT 模式的三个角色（RM、TM、TC），接下来我会更新 Seata 源码分析系列文章。今天就来分析 Seata AT 模式在启动的时候都做了哪些操作。</p>
<h1>客户端启动逻辑</h1>
<p>TM 是负责整个全局事务的管理器，因此一个全局事务是由 TM 开启的，TM 有个全局管理类 GlobalTransaction，结构如下：</p>
<p>io.seata.tm.api.GlobalTransaction</p>
<pre><code class="language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">interface</span> <span class="hljs-title">GlobalTransaction</span> </span>{

  <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">begin</span><span class="hljs-params">()</span> <span class="hljs-keyword">throws</span> TransactionException</span>;

  <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">begin</span><span class="hljs-params">(<span class="hljs-keyword">int</span> timeout)</span> <span class="hljs-keyword">throws</span> TransactionException</span>;

  <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">begin</span><span class="hljs-params">(<span class="hljs-keyword">int</span> timeout, String name)</span> <span class="hljs-keyword">throws</span> TransactionException</span>;

  <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">commit</span><span class="hljs-params">()</span> <span class="hljs-keyword">throws</span> TransactionException</span>;

  <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">rollback</span><span class="hljs-params">()</span> <span class="hljs-keyword">throws</span> TransactionException</span>;
  
  <span class="hljs-function">GlobalStatus <span class="hljs-title">getStatus</span><span class="hljs-params">()</span> <span class="hljs-keyword">throws</span> TransactionException</span>;
  
  <span class="hljs-comment">// ...</span>
}
</code></pre>
<p>可以通过 GlobalTransactionContext 创建一个 GlobalTransaction，然后用 GlobalTransaction 进行全局事务的开启、提交、回滚等操作，因此我们直接用 API 方式使用 Seata AT 模式：</p>
<pre><code class="language-java"><span class="hljs-comment">//init seata;</span>
TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup);
<span class="hljs-comment">//trx</span>
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
<span class="hljs-keyword">try</span> {
  tx.begin(<span class="hljs-number">60000</span>, <span class="hljs-string">"testBiz"</span>);
  <span class="hljs-comment">// 事务处理</span>
  <span class="hljs-comment">// ...</span>
  tx.commit();
} <span class="hljs-keyword">catch</span> (Exception exx) {
  tx.rollback();
  <span class="hljs-keyword">throw</span> exx;
}
</code></pre>
<p>如果每次使用全局事务都这样写，难免会造成代码冗余，我们的项目都是基于 Spring 容器，这时我们可以利用 Spring AOP 的特性，用模板模式把这些冗余代码封装模版里，参考 Mybatis-spring 也是做了这么一件事情，那么接下来我们来分析一下基于 Spring 的项目启动 Seata 并注册全局事务时都做了哪些工作。</p>
<p>我们开启一个全局事务是在方法上加上 <code>@GlobalTransactional</code>注解，Seata 的 Spring 模块中，有个 GlobalTransactionScanner，它的继承关系如下：</p>
<pre><code class="language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">GlobalTransactionScanner</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">AbstractAutoProxyCreator</span> <span class="hljs-keyword">implements</span> <span class="hljs-title">InitializingBean</span>, <span class="hljs-title">ApplicationContextAware</span>, <span class="hljs-title">DisposableBean</span> </span>{
  <span class="hljs-comment">// ...</span>
}
</code></pre>
<p>在基于 Spring 项目的启动过程中，对该类会有如下初始化流程：</p>
<p><img src="https://gitee.com/objcoding/md-picture/raw/master/img/image-20191124155455309.png" alt="image-20191124155455309"></p>
<p>InitializingBean 的 afterPropertiesSet() 方法调用了 initClient() 方法：</p>
<p>io.seata.spring.annotation.GlobalTransactionScanner#initClient</p>
<pre><code class="language-java">TMClient.init(applicationId, txServiceGroup);
RMClient.init(applicationId, txServiceGroup);
</code></pre>
<p>对 TM 和 RM 做了初始化操作。</p>
<ul>
<li>TM 初始化</li>
</ul>
<p>io.seata.tm.TMClient#init</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">init</span><span class="hljs-params">(String applicationId, String transactionServiceGroup)</span> </span>{
  <span class="hljs-comment">// 获取 TmRpcClient 实例</span>
  TmRpcClient tmRpcClient = TmRpcClient.getInstance(applicationId, transactionServiceGroup);
  <span class="hljs-comment">// 初始化 TM Client</span>
  tmRpcClient.init();
}
</code></pre>
<p>调用 TmRpcClient.getInstance() 方法会获取一个 TM 客户端实例，在获取过程中，会创建 Netty 客户端配置文件对象，以及创建 messageExecutor 线程池，该线程池用于在处理各种与服务端的消息交互，在创建 TmRpcClient 实例时，创建 ClientBootstrap，用于管理 Netty 服务的启停，以及 ClientChannelManager，它是专门用于管理 Netty 客户端对象池，Seata 的 Netty 部分配合使用了对象池，后面在分析网络模块会讲到。</p>
<p>io.seata.core.rpc.netty.AbstractRpcRemotingClient#init</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">init</span><span class="hljs-params">()</span> </span>{
  clientBootstrap.start();
  <span class="hljs-comment">// 定时尝试连接服务端</span>
  timerExecutor.scheduleAtFixedRate(<span class="hljs-keyword">new</span> Runnable() {
    <span class="hljs-meta">@Override</span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">run</span><span class="hljs-params">()</span> </span>{
      clientChannelManager.reconnect(getTransactionServiceGroup());
    }
  }, SCHEDULE_INTERVAL_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.SECONDS);
  mergeSendExecutorService = <span class="hljs-keyword">new</span> ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                                                    MAX_MERGE_SEND_THREAD,
                                                    KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                                                    <span class="hljs-keyword">new</span> LinkedBlockingQueue&lt;&gt;(),
                                                    <span class="hljs-keyword">new</span> NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
  mergeSendExecutorService.submit(<span class="hljs-keyword">new</span> MergedSendRunnable());
  <span class="hljs-keyword">super</span>.init();
}
</code></pre>
<p>调用 TM 客户端 init() 方法，最终会启动 netty 客户端（此时还未真正启动，在对象池被调用时才会被真正启动）；开启一个定时任务，定时重新发送 RegisterTMRequest（RM 客户端会发送 RegisterRMRequest）请求尝试连接服务端，具体逻辑是在 NettyClientChannelManager 中的 channels 中缓存了客户端 channel，如果此时 channels 不存在获取已过期，那么就会尝试连接服务端以重新获取 channel 并将其缓存到 channels 中；开启一条单独线程，用于处理异步请求发送，这里用得很巧妙，之后在分析网络模块在具体对其进行分析。</p>
<p>io.seata.core.rpc.netty.AbstractRpcRemoting#init</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">init</span><span class="hljs-params">()</span> </span>{
  timerExecutor.scheduleAtFixedRate(<span class="hljs-keyword">new</span> Runnable() {
    <span class="hljs-meta">@Override</span>
    <span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">void</span> <span class="hljs-title">run</span><span class="hljs-params">()</span> </span>{
      <span class="hljs-keyword">for</span> (Map.Entry&lt;Integer, MessageFuture&gt; entry : futures.entrySet()) {
        <span class="hljs-keyword">if</span> (entry.getValue().isTimeout()) {
          futures.remove(entry.getKey());
          entry.getValue().setResultMessage(<span class="hljs-keyword">null</span>);
          <span class="hljs-keyword">if</span> (LOGGER.isDebugEnabled()) {
            LOGGER.debug(<span class="hljs-string">"timeout clear future: {}"</span>, entry.getValue().getRequestMessage().getBody());
          }
        }
      }

      nowMills = System.currentTimeMillis();
    }
  }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}
</code></pre>
<p>在 AbstractRpcRemoting 的 init 方法中，又是开启了一个定时任务，该定时任务主要是用于定时清除 futures 已过期的 futrue，futures 是保存发送请求需要返回结果的 future 对象，该对象有个超时时间，过了超时时间就会自动抛异常，因此需要定时清除已过期的 future 对象。</p>
<ul>
<li>RM 初始化</li>
</ul>
<p>io.seata.rm.RMClient#init</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">init</span><span class="hljs-params">(String applicationId, String transactionServiceGroup)</span> </span>{
  RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
  rmRpcClient.setResourceManager(DefaultResourceManager.get());
  rmRpcClient.setClientMessageListener(<span class="hljs-keyword">new</span> RmMessageListener(DefaultRMHandler.get()));
  rmRpcClient.init();
}
</code></pre>
<p>RmRpcClient.getInstance 处理逻辑与 TM 大致相同；ResourceManager 是 RM 资源管理器，负责分支事务的注册、提交、上报、以及回滚操作，以及全局锁的查询操作，DefaultResourceManager 会持有当前所有的 RM 资源管理器，进行统一调用处理，而 get() 方法主要是加载当前的资源管理器，主要用了类似 SPI 的机制，进行灵活加载，如下图，Seata 会扫描 META-INF/services/ 目录下的配置类并进行动态加载。</p>
<p>ClientMessageListener 是 RM 消息处理监听器，用于负责处理从 TC 发送过来的指令，并对分支进行分支提交、分支回滚，以及 undo log 删除操作；最后 init 方法跟 TM 逻辑也大体一致；DefaultRMHandler 封装了 RM 分支事务的一些具体操作逻辑。</p>
<p>接下来再看看 wrapIfNecessary 方法究竟做了哪些操作。</p>
<p>io.seata.spring.annotation.GlobalTransactionScanner#wrapIfNecessary</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">protected</span> Object <span class="hljs-title">wrapIfNecessary</span><span class="hljs-params">(Object bean, String beanName, Object cacheKey)</span> </span>{
  <span class="hljs-comment">// 判断是否有开启全局事务</span>
  <span class="hljs-keyword">if</span> (disableGlobalTransaction) {
    <span class="hljs-keyword">return</span> bean;
  }
  <span class="hljs-keyword">try</span> {
    <span class="hljs-keyword">synchronized</span> (PROXYED_SET) {
      <span class="hljs-keyword">if</span> (PROXYED_SET.contains(beanName)) {
        <span class="hljs-keyword">return</span> bean;
      }
      interceptor = <span class="hljs-keyword">null</span>;
      <span class="hljs-comment">//check TCC proxy</span>
      <span class="hljs-keyword">if</span> (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
        <span class="hljs-comment">//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC</span>
        interceptor = <span class="hljs-keyword">new</span> TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
      } <span class="hljs-keyword">else</span> {
        Class&lt;?&gt; serviceInterface = SpringProxyUtils.findTargetClass(bean);
        Class&lt;?&gt;[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

        <span class="hljs-comment">// 判断 bean 中是否有 GlobalTransactional 和 GlobalLock 注解</span>
        <span class="hljs-keyword">if</span> (!existsAnnotation(<span class="hljs-keyword">new</span> Class[]{serviceInterface})
            &amp;&amp; !existsAnnotation(interfacesIfJdk)) {
          <span class="hljs-keyword">return</span> bean;
        }

        <span class="hljs-keyword">if</span> (interceptor == <span class="hljs-keyword">null</span>) {
          <span class="hljs-comment">// 创建代理类</span>
          interceptor = <span class="hljs-keyword">new</span> GlobalTransactionalInterceptor(failureHandlerHook);
        }
      }

      LOGGER.info(<span class="hljs-string">"Bean[{}] with name [{}] would use interceptor [{}]"</span>,
                  bean.getClass().getName(), beanName, interceptor.getClass().getName());
      <span class="hljs-keyword">if</span> (!AopUtils.isAopProxy(bean)) {
        bean = <span class="hljs-keyword">super</span>.wrapIfNecessary(bean, beanName, cacheKey);
      } <span class="hljs-keyword">else</span> {
        AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
        <span class="hljs-comment">// 执行包装目标对象到代理对象  </span>
        Advisor[] advisor = <span class="hljs-keyword">super</span>.buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(<span class="hljs-keyword">null</span>, <span class="hljs-keyword">null</span>, <span class="hljs-keyword">null</span>));
        <span class="hljs-keyword">for</span> (Advisor avr : advisor) {
          advised.addAdvisor(<span class="hljs-number">0</span>, avr);
        }
      }
      PROXYED_SET.add(beanName);
      <span class="hljs-keyword">return</span> bean;
    }
  } <span class="hljs-keyword">catch</span> (Exception exx) {
    <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> RuntimeException(exx);
  }
}
</code></pre>
<p>GlobalTransactionScanner 继承了 AbstractAutoProxyCreator，用于对 Spring AOP 支持，从代码中可看出，用GlobalTransactionalInterceptor 代替了被 GlobalTransactional 和 GlobalLock 注解的方法。</p>
<p>GlobalTransactionalInterceptor 实现了 MethodInterceptor：</p>
<p>io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> Object <span class="hljs-title">invoke</span><span class="hljs-params">(<span class="hljs-keyword">final</span> MethodInvocation methodInvocation)</span> <span class="hljs-keyword">throws</span> Throwable </span>{
  Class&lt;?&gt; targetClass = methodInvocation.getThis() != <span class="hljs-keyword">null</span> ? AopUtils.getTargetClass(methodInvocation.getThis()) : <span class="hljs-keyword">null</span>;
  Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
  <span class="hljs-keyword">final</span> Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);

  <span class="hljs-keyword">final</span> GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
  <span class="hljs-keyword">final</span> GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
  <span class="hljs-keyword">if</span> (globalTransactionalAnnotation != <span class="hljs-keyword">null</span>) {
    <span class="hljs-comment">// 全局事务注解</span>
    <span class="hljs-keyword">return</span> handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
  } <span class="hljs-keyword">else</span> <span class="hljs-keyword">if</span> (globalLockAnnotation != <span class="hljs-keyword">null</span>) {
    <span class="hljs-comment">// 全局锁注解</span>
    <span class="hljs-keyword">return</span> handleGlobalLock(methodInvocation);
  } <span class="hljs-keyword">else</span> {
    <span class="hljs-keyword">return</span> methodInvocation.proceed();
  }
}
</code></pre>
<p>以上是代理方法执行的逻辑逻辑，其中 handleGlobalTransaction() 方法里面调用了 TransactionalTemplate 模版：</p>
<p>io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">private</span> Object <span class="hljs-title">handleGlobalTransaction</span><span class="hljs-params">(<span class="hljs-keyword">final</span> MethodInvocation methodInvocation,
                                       <span class="hljs-keyword">final</span> GlobalTransactional globalTrxAnno)</span> <span class="hljs-keyword">throws</span> Throwable </span>{
  <span class="hljs-keyword">try</span> {
    <span class="hljs-keyword">return</span> transactionalTemplate.execute(<span class="hljs-keyword">new</span> TransactionalExecutor() {
      <span class="hljs-meta">@Override</span>
      <span class="hljs-function"><span class="hljs-keyword">public</span> Object <span class="hljs-title">execute</span><span class="hljs-params">()</span> <span class="hljs-keyword">throws</span> Throwable </span>{
        <span class="hljs-keyword">return</span> methodInvocation.proceed();
      }
      <span class="hljs-meta">@Override</span>
      <span class="hljs-function"><span class="hljs-keyword">public</span> TransactionInfo <span class="hljs-title">getTransactionInfo</span><span class="hljs-params">()</span> </span>{
        <span class="hljs-comment">// ...</span>
      }
    });
  } <span class="hljs-keyword">catch</span> (TransactionalExecutor.ExecutionException e) {
    <span class="hljs-comment">// ...</span>
  }
}
</code></pre>
<p>handleGlobalTransaction() 方法执行了就是 TransactionalTemplate 模版类的 execute 方法：</p>
<p>io.seata.tm.api.TransactionalTemplate#execute</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> Object <span class="hljs-title">execute</span><span class="hljs-params">(TransactionalExecutor business)</span> <span class="hljs-keyword">throws</span> Throwable </span>{
  <span class="hljs-comment">// 1. get or create a transaction</span>
  GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

  <span class="hljs-comment">// 1.1 get transactionInfo</span>
  TransactionInfo txInfo = business.getTransactionInfo();
  <span class="hljs-keyword">if</span> (txInfo == <span class="hljs-keyword">null</span>) {
    <span class="hljs-keyword">throw</span> <span class="hljs-keyword">new</span> ShouldNeverHappenException(<span class="hljs-string">"transactionInfo does not exist"</span>);
  }
  <span class="hljs-keyword">try</span> {

    <span class="hljs-comment">// 2. begin transaction</span>
    beginTransaction(txInfo, tx);

    Object rs = <span class="hljs-keyword">null</span>;
    <span class="hljs-keyword">try</span> {

      <span class="hljs-comment">// Do Your Business</span>
      rs = business.execute();

    } <span class="hljs-keyword">catch</span> (Throwable ex) {

      <span class="hljs-comment">// 3.the needed business exception to rollback.</span>
      completeTransactionAfterThrowing(txInfo,tx,ex);
      <span class="hljs-keyword">throw</span> ex;
    }

    <span class="hljs-comment">// 4. everything is fine, commit.</span>
    commitTransaction(tx);

    <span class="hljs-keyword">return</span> rs;
  } <span class="hljs-keyword">finally</span> {
    <span class="hljs-comment">//5. clear</span>
    triggerAfterCompletion();
    cleanUp();
  }
}
</code></pre>
<p>以上是不是有一种似曾相识的感觉？没错，以上就是我们使用 API 时经常写的冗余代码，现在 Spring 通过代理模式，把这些冗余代码都封装带模版里面了，它将那些冗余代码统统封装起来统一流程处理，并不需要你显示写出来了，有兴趣的也可以去看看 Mybatis-spring 的源码，也是写得非常精彩。</p>
<h1>服务端处理逻辑</h1>
<p>服务端收到客户端的连接，那当然是将其 channel 也缓存起来，前面也说到客户端会发送 RegisterRMRequest/RegisterTMRequest 请求给服务端，服务端收到后会调用 ServerMessageListener 监听器处理：</p>
<p>io.seata.core.rpc.ServerMessageListener</p>
<pre><code class="language-java"><span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">interface</span> <span class="hljs-title">ServerMessageListener</span> </span>{
  <span class="hljs-comment">// 处理各种事务，如分支注册、分支提交、分支上报、分支回滚等等</span>
  <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">onTrxMessage</span><span class="hljs-params">(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender)</span></span>;
	<span class="hljs-comment">// 处理 RM 客户端的注册连接</span>
  <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">onRegRmMessage</span><span class="hljs-params">(RpcMessage request, ChannelHandlerContext ctx,
                      ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler)</span></span>;
  <span class="hljs-comment">// 处理 TM 客户端的注册连接</span>
  <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">onRegTmMessage</span><span class="hljs-params">(RpcMessage request, ChannelHandlerContext ctx,
                      ServerMessageSender sender, RegisterCheckAuthHandler checkAuthHandler)</span></span>;
  <span class="hljs-comment">// 服务端与客户端保持心跳</span>
  <span class="hljs-function"><span class="hljs-keyword">void</span> <span class="hljs-title">onCheckMessage</span><span class="hljs-params">(RpcMessage request, ChannelHandlerContext ctx, ServerMessageSender sender)</span>

}
</span></code></pre>
<p>ChannelManager 是服务端 channel 的管理器，服务端每次和客户端通信，都需要从 ChannelManager 中获取客户端对应的 channel，它用于保存 TM 和 RM 客户端 channel 的缓存结构如下：</p>
<pre><code class="language-java"><span class="hljs-comment">/**
 * resourceId -&gt; applicationId -&gt; ip -&gt; port -&gt; RpcContext
 */</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> ConcurrentMap&lt;String, ConcurrentMap&lt;String, ConcurrentMap&lt;String, ConcurrentMap&lt;Integer,
RpcContext&gt;&gt;&gt;&gt;
  RM_CHANNELS = <span class="hljs-keyword">new</span> ConcurrentHashMap&lt;String, ConcurrentMap&lt;String, ConcurrentMap&lt;String, ConcurrentMap&lt;Integer,
RpcContext&gt;&gt;&gt;&gt;();

<span class="hljs-comment">/**
 * ip+appname,port
 */</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> ConcurrentMap&lt;String, ConcurrentMap&lt;Integer, RpcContext&gt;&gt; TM_CHANNELS
  = <span class="hljs-keyword">new</span> ConcurrentHashMap&lt;String, ConcurrentMap&lt;Integer, RpcContext&gt;&gt;();
</code></pre>
<p>以上的 Map 结构有点复杂：</p>
<p>RM_CHANNELS：</p>
<ol>
<li>resourceId 指的是 RM client 的数据库地址；</li>
<li>applicationId 指的是 RM client 的服务 Id，比如 springboot 的配置 spring.application.name=account-service 中的 account-service 即是  applicationId；</li>
<li>ip 指的是 RM client 服务地址；</li>
<li>port 指的是 RM client 服务地址；</li>
<li>RpcContext 保存了本次注册请求的信息。</li>
</ol>
<p>TM_CHANNELS：</p>
<ol>
<li>ip+appname：这里的注释应该是写错了，应该是 appname+ip，即 TM_CHANNELS 的 Map 结构第一个 key 为 appname+ip；</li>
<li>port：客户端的端口号。</li>
</ol>
<p>以下是 RM Client 注册逻辑：</p>
<p>io.seata.core.rpc.ChannelManager#registerRMChannel</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">registerRMChannel</span><span class="hljs-params">(RegisterRMRequest resourceManagerRequest, Channel channel)</span>
  <span class="hljs-keyword">throws</span> IncompatibleVersionException </span>{
  Version.checkVersion(resourceManagerRequest.getVersion());
  <span class="hljs-comment">// 将 ResourceIds 数据库连接连接信息放入一个set中</span>
  Set&lt;String&gt; dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
  RpcContext rpcContext;
  <span class="hljs-comment">// 从缓存中判断是否有该channel信息</span>
  <span class="hljs-keyword">if</span> (!IDENTIFIED_CHANNELS.containsKey(channel)) {
    <span class="hljs-comment">// 根据请求注册信息，构建 rpcContext</span>
    rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
                                    resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
                                    resourceManagerRequest.getResourceIds(), channel);
    <span class="hljs-comment">// 将 rpcContext 放入缓存中</span>
    rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
  } <span class="hljs-keyword">else</span> {
    rpcContext = IDENTIFIED_CHANNELS.get(channel);
    rpcContext.addResources(dbkeySet);
  }
  <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> == dbkeySet || dbkeySet.isEmpty()) { <span class="hljs-keyword">return</span>; }
  <span class="hljs-keyword">for</span> (String resourceId : dbkeySet) {
    String clientIp;
    <span class="hljs-comment">// 将请求信息存入 RM_CHANNELS 中，这里用了 java8 的 computeIfAbsent 方法操作</span>
    ConcurrentMap&lt;Integer, RpcContext&gt; portMap = RM_CHANNELS.computeIfAbsent(resourceId, resourceIdKey -&gt; <span class="hljs-keyword">new</span> ConcurrentHashMap&lt;&gt;())
      .computeIfAbsent(resourceManagerRequest.getApplicationId(), applicationId -&gt; <span class="hljs-keyword">new</span> ConcurrentHashMap&lt;&gt;())
      .computeIfAbsent(clientIp = getClientIpFromChannel(channel), clientIpKey -&gt; <span class="hljs-keyword">new</span> ConcurrentHashMap&lt;&gt;());
		<span class="hljs-comment">// 将当前 rpcContext 放入 portMap 中</span>
    rpcContext.holdInResourceManagerChannels(resourceId, portMap);
    updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
  }
}
</code></pre>
<p>从以上代码逻辑能够看出，注册 RM client 主要是将注册请求信息，放入 RM_CHANNELS 缓存中，同时还会从 IDENTIFIED_CHANNELS 中判断本次请求的 channel 是否已验证过，IDENTIFIED_CHANNELS 的结构如下：</p>
<pre><code class="language-java"><span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> ConcurrentMap&lt;Channel, RpcContext&gt; IDENTIFIED_CHANNELS
  = <span class="hljs-keyword">new</span> ConcurrentHashMap&lt;&gt;();
</code></pre>
<p>IDENTIFIED_CHANNELS 包含了所有 TM 和 RM 已注册的 channel。</p>
<p>以下是 TM 注册逻辑：</p>
<p>io.seata.core.rpc.ChannelManager#registerTMChannel</p>
<pre><code class="language-java"><span class="hljs-function"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">registerTMChannel</span><span class="hljs-params">(RegisterTMRequest request, Channel channel)</span>
  <span class="hljs-keyword">throws</span> IncompatibleVersionException </span>{
  Version.checkVersion(request.getVersion());
  <span class="hljs-comment">// 根据请求注册信息，构建 RpcContext</span>
  RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(),
                                             request.getApplicationId(),
                                             request.getTransactionServiceGroup(),
                                             <span class="hljs-keyword">null</span>, channel);
  <span class="hljs-comment">// 将 RpcContext 放入 IDENTIFIED_CHANNELS 缓存中</span>
  rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
  <span class="hljs-comment">// account-service:127.0.0.1:63353</span>
  String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR
    + getClientIpFromChannel(channel);
  <span class="hljs-comment">// 将请求信息存入 TM_CHANNELS 缓存中</span>
  TM_CHANNELS.putIfAbsent(clientIdentified, <span class="hljs-keyword">new</span> ConcurrentHashMap&lt;Integer, RpcContext&gt;());
  <span class="hljs-comment">// 将上一步创建好的get出来，之后再将rpcContext放入这个map的value中</span>
  ConcurrentMap&lt;Integer, RpcContext&gt; clientIdentifiedMap = TM_CHANNELS.get(clientIdentified);
  rpcContext.holdInClientChannels(clientIdentifiedMap);
}
</code></pre>
<p>TM client 的注册大体类似，把本次注册的信息放入对应的缓存中保存，但比 RM client 的注册逻辑简单一些，主要是 RM client 会涉及分支事务资源的信息，需要注册的信息也会比 TM client 多。</p>
<p>以上源码分析基于 0.9.0 版本。</p>
<h1>作者简介</h1>
<p>张乘辉，目前就职于中通科技信息中心技术平台部，担任 Java 工程师，主要负责中通消息平台与全链路压测项目的研发，热爱分享技术，微信公众号「后端进阶」作者，技术博客（<a href="https://objcoding.com/">https://objcoding.com/</a>）博主，Seata Contributor，GitHub ID：objcoding。</p>
</section><footer class="footer-container"><div class="footer-body"><img src="/img/seata_logo_gray.png"/><p class="docsite-power">website powered by docsite</p><div class="cols-container"><div class="col col-12"><h3>愿景</h3><p>Seata 是一款阿里巴巴开源的分布式事务解决方案，致力于在微服务架构下提供高性能和简单易用的分布式事务服务。</p></div><div class="col col-6"><dl><dt>文档</dt><dd><a href="/zh-cn/docs/overview/what-is-seata.html" target="_self">Seata 是什么？</a></dd><dd><a href="/zh-cn/docs/user/quickstart.html" target="_self">快速开始</a></dd><dd><a href="https://github.com/seata/seata.github.io/issues/new" target="_self">报告文档问题</a></dd><dd><a href="https://github.com/seata/seata.github.io" target="_self">在Github上编辑此文档</a></dd></dl></div><div class="col col-6"><dl><dt>资源</dt><dd><a href="/zh-cn/blog/index.html" target="_self">博客</a></dd><dd><a href="/zh-cn/community/index.html" target="_self">社区</a></dd></dl></div></div><div class="copyright"><span>Copyright © 2019 Seata</span></div></div></footer></div></div>
	<script src="https://f.alicdn.com/react/15.4.1/react-with-addons.min.js"></script>
	<script src="https://f.alicdn.com/react/15.4.1/react-dom.min.js"></script>
	<script>
		window.rootPath = '';
  </script>
	<script src="/build/blogDetail.js"></script>
	<script>
    var _hmt = _hmt || [];
    (function() {
      var hm = document.createElement("script");
      hm.src = "https://hm.baidu.com/hm.js?104e73ef0c18b416b27abb23757ed8ee";
      var s = document.getElementsByTagName("script")[0];
      s.parentNode.insertBefore(hm, s);
    })();
    </script>
</body>
</html>
